package com.cantor.consumer.handler;

import cn.hutool.core.util.ObjectUtil;
import com.cantor.consumer.future.FuturesKeeper;
import com.cantor.core.message.CantorResponseMessage;
import com.cantor.core.pool.CantorExecutorPool;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CompletableFuture;

@Slf4j
@ChannelHandler.Sharable
public class CantorResponseMessageHandler extends SimpleChannelInboundHandler<CantorResponseMessage> {

    // 收到服务端响应, 将FuturesKeeper中指定sequenceId的CompletableFuture完成.
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, CantorResponseMessage res) throws Exception {
        // Channel收发都使用同一个信道, 因此这里的操作尽量改成异步, 防止资源占用.
        CantorExecutorPool.execute(()->{
            log.debug("{} <<< Response <<< {}",ctx.channel().localAddress(),ctx.channel().remoteAddress());
            try {
                // 检出CompletableFuture对象
                CompletableFuture future = FuturesKeeper.checkout(res.getSequenceId());
                // 如果是失败的回应
                Exception exceptionValue = res.getExceptionValue();
                if (ObjectUtil.isNotNull(exceptionValue)) {
                    future.completeExceptionally(exceptionValue);
                    return;
                }
                // 成功
                future.complete(res.getReturnValue());
            } finally {
                // 销毁
                ReferenceCountUtil.release(res);
            }
        });
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        log.error("客户端处理CantorResponse请求时发生错误");
    }

}
